932ecd9a8bc3feade0b7c0712dac6ed393231b95,test/src/test/java/org/corfudb/runtime/object/transactions/OptimisticTXConcurrencyTest.java,OptimisticTXConcurrencyTest,testOptimism,#,171
Before Change
assertThat(numTasks).isGreaterThan(1); // don't change concurrency to less than 2, test will break
// a state-machine:
ArrayList<BiConsumer<Integer, Integer>> stateMachine = new ArrayList<BiConsumer<Integer, Integer>>();
// SM step 1: start an optimistic transaction
stateMachine.add((Integer ignored_thread_num, Integer ignored_task_num) -> {
TXBegin();
});
// SM step 2: task k modify counter k
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
sharedCounters.get(task_num).setValue(OVERWRITE_ONCE);
});
// SM step 3: task k reads counter k+1
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
.isBetween(INITIAL, OVERWRITE_ONCE);
});
// SM step 4: task k verifies opacity, checking that it can read its own modified value of counter k
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
assertThat(sharedCounters.get(task_num).getValue())
.isEqualTo(OVERWRITE_ONCE);
});
// SM step 5: task k overwrites counter k+1
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
sharedCounters.get((task_num+1)%numTasks).setValue(OVERWRITE_TWICE);
} );
// SM step 6: task k again check opacity, reading its own modified value, this time of counter k+1
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
.isEqualTo(OVERWRITE_TWICE);
});
// SM step 7: each thread again verifies opacity, checking that it can re-read counter k
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
assertThat(sharedCounters.get(task_num).getValue())
.isEqualTo(OVERWRITE_ONCE);
});
// SM step 8: try to commit all transactions;
// task k aborts only if one or both of (k-1), (k+1) committed before it
stateMachine.add((Integer ignored_thread_num, Integer task_num) -> {
try {
TXEnd();
commitStatus.set(task_num, COMMITVALUE);
} catch (TransactionAbortedException tae) {
assertThat(commitStatus.get((task_num + 1) % numTasks) == COMMITVALUE ||
commitStatus.get((task_num - 1) % numTasks) == COMMITVALUE)
.isTrue();
}
} );
// invoke the interleaving engine
scheduleInterleaved(PARAMETERS.CONCURRENCY_SOME, numTasks, stateMachine);
}
void TXEnd() {
After Change
});
// SM step 2: task k modify counter k
addTestStep((task_num) -> {
sharedCounters.get(task_num).setValue(OVERWRITE_ONCE);
});
// SM step 3: task k reads counter k+1
addTestStep((task_num) -> {
assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
.isBetween(INITIAL, OVERWRITE_ONCE);
});
// SM step 4: task k verifies opacity, checking that it can read its own modified value of counter k
addTestStep((task_num) -> {
assertThat(sharedCounters.get(task_num).getValue())
.isEqualTo(OVERWRITE_ONCE);
});
// SM step 5: task k overwrites counter k+1
addTestStep((task_num) -> {
sharedCounters.get((task_num + 1) % numTasks).setValue(OVERWRITE_TWICE);
});
// SM step 6: task k again check opacity, reading its own modified value, this time of counter k+1
addTestStep((task_num) -> {
assertThat(sharedCounters.get((task_num + 1) % numTasks).getValue())
.isEqualTo(OVERWRITE_TWICE);
});
// SM step 7: each thread again verifies opacity, checking that it can re-read counter k
addTestStep((task_num) -> {
assertThat(sharedCounters.get(task_num).getValue())
.isEqualTo(OVERWRITE_ONCE);
});
// SM step 8: try to commit all transactions;
// task k aborts only if one or both of (k-1), (k+1) committed before it
addTestStep((task_num) -> {
try {
TXEnd();
commitStatus.set(task_num, COMMITVALUE);
} catch (TransactionAbortedException tae) {
// do nothing
}
});
// invoke the execution engine
if (testInterleaved)
scheduleInterleaved(PARAMETERS.CONCURRENCY_SOME, numTasks);
else
scheduleThreaded(PARAMETERS.CONCURRENCY_SOME, numTasks);
// verfiy that all aborts are justified
for (int task_num = 0; task_num < numTasks; task_num++) {